/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.thrift.partial;

import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TType;

import java.lang.StringBuilder;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * Enables comparison of two TBase instances such that the comparison
 * is limited to the subset of fields defined by the supplied metadata.
 *
 * This comparer is useful when comparing two instances where:
 * -- one is generated by full deserialization.
 * -- the other is generated by partial deserialization.
 *
 * The typical use case is to establish correctness of partial deserialization.
 */
public class PartialThriftComparer<T extends TBase> {

  private enum ComparisonResult {
    UNKNOWN,
    EQUAL,
    NOT_EQUAL
  }

  // Metadata that defines the scope of comparison.
  private ThriftMetadata.ThriftStruct metadata;

  /**
   * Constructs an instance of {@link PartialThriftComparer}.
   *
   * @param metadata defines the scope of comparison.
   */
  public PartialThriftComparer(ThriftMetadata.ThriftStruct metadata) {
    this.metadata = metadata;
  }

  /**
   * Compares thrift objects {@code t1} and {@code t2} and
   * returns true if they are equal false otherwise. The comparison is limited
   * to the scope defined by {@code metadata}.
   * <p>
   * If the objects are not equal then it optionally records their differences
   * if {@code sb} is supplied.
   * <p>
   *
   * @param t1 the first object.
   * @param t2 the second object.
   * @param sb if non-null, results of the comparison are returned in it.
   * @return true if objects are equivalent, false otherwise.
   */
  public boolean areEqual(T t1, T t2, StringBuilder sb) {
    return this.areEqual(this.metadata, t1, t2, sb);
  }

  private boolean areEqual(
      ThriftMetadata.ThriftObject data,
      Object o1,
      Object o2,
      StringBuilder sb) {

    byte fieldType = data.data.valueMetaData.type;
    switch (fieldType) {
      case TType.STRUCT:
        return this.areEqual((ThriftMetadata.ThriftStruct) data, o1, o2, sb);

      case TType.LIST:
        return this.areEqual((ThriftMetadata.ThriftList) data, o1, o2, sb);

      case TType.MAP:
        return this.areEqual((ThriftMetadata.ThriftMap) data, o1, o2, sb);

      case TType.SET:
        return this.areEqual((ThriftMetadata.ThriftSet) data, o1, o2, sb);

      case TType.ENUM:
        return this.areEqual((ThriftMetadata.ThriftEnum) data, o1, o2, sb);

      case TType.BOOL:
      case TType.BYTE:
      case TType.I16:
      case TType.I32:
      case TType.I64:
      case TType.DOUBLE:
      case TType.STRING:
        return this.areEqual((ThriftMetadata.ThriftPrimitive) data, o1, o2, sb);

      default:
        throw unsupportedFieldTypeException(fieldType);
    }
  }

  private boolean areEqual(
      ThriftMetadata.ThriftStruct data,
      Object o1,
      Object o2,
      StringBuilder sb) {
    ComparisonResult result = checkNullEquality(data, o1, o2, sb);
    if (result != ComparisonResult.UNKNOWN) {
      return result == ComparisonResult.EQUAL;
    }

    TBase t1 = (TBase) o1;
    TBase t2 = (TBase) o2;

    if (data.fields.size() == 0) {
      if (t1.equals(t2)) {
        return true;
      } else {
        appendNotEqual(data, sb, t1, t2, "struct1", "struct2");
        return false;
      }
    } else {

      boolean overallResult = true;

      for (Object o : data.fields.values()) {
        ThriftMetadata.ThriftObject field = (ThriftMetadata.ThriftObject) o;
        Object f1 = t1.getFieldValue(field.fieldId);
        Object f2 = t2.getFieldValue(field.fieldId);
        overallResult = overallResult && this.areEqual(field, f1, f2, sb);
      }

      return overallResult;
    }
  }

  private boolean areEqual(
      ThriftMetadata.ThriftPrimitive data,
      Object o1,
      Object o2,
      StringBuilder sb) {

    ComparisonResult result = checkNullEquality(data, o1, o2, sb);
    if (result != ComparisonResult.UNKNOWN) {
      return result == ComparisonResult.EQUAL;
    }

    if (data.isBinary()) {
      if (areBinaryFieldsEqual(o1, o2)) {
        return true;
      }
    } else if (o1.equals(o2)) {
      return true;
    }

    appendNotEqual(data, sb, o1, o2, "o1", "o2");
    return false;
  }

  private boolean areEqual(
      ThriftMetadata.ThriftEnum data,
      Object o1,
      Object o2,
      StringBuilder sb) {

    ComparisonResult result = checkNullEquality(data, o1, o2, sb);
    if (result != ComparisonResult.UNKNOWN) {
      return result == ComparisonResult.EQUAL;
    }

    if (o1.equals(o2)) {
      return true;
    }

    appendNotEqual(data, sb, o1, o2, "o1", "o2");
    return false;
  }

  private boolean areEqual(
      ThriftMetadata.ThriftList data,
      Object o1,
      Object o2,
      StringBuilder sb) {

    List<Object> l1 = (List<Object>) o1;
    List<Object> l2 = (List<Object>) o2;

    ComparisonResult result = checkNullEquality(data, o1, o2, sb);
    if (result != ComparisonResult.UNKNOWN) {
      return result == ComparisonResult.EQUAL;
    }

    if (!checkSizeEquality(data, l1, l2, sb, "list")) {
      return false;
    }

    for (int i = 0; i < l1.size(); i++) {
      Object e1 = l1.get(i);
      Object e2 = l2.get(i);
      if (!this.areEqual(data.elementData, e1, e2, sb)) {
        return false;
      }
    }

    return true;
  }

  private boolean areEqual(
      ThriftMetadata.ThriftSet data,
      Object o1,
      Object o2,
      StringBuilder sb) {

    Set<Object> s1 = (Set<Object>) o1;
    Set<Object> s2 = (Set<Object>) o2;

    ComparisonResult result = checkNullEquality(data, o1, o2, sb);
    if (result != ComparisonResult.UNKNOWN) {
      return result == ComparisonResult.EQUAL;
    }

    if (!checkSizeEquality(data, s1, s2, sb, "set")) {
      return false;
    }

    for (Object e1 : s1) {
      if (!s2.contains(e1)) {
        appendResult(data, sb, "Element %s in s1 not found in s2", e1);
        return false;
      }
    }

    return true;
  }

  private boolean areEqual(
      ThriftMetadata.ThriftMap data,
      Object o1,
      Object o2,
      StringBuilder sb) {

    Map<Object, Object> m1 = (Map<Object, Object>) o1;
    Map<Object, Object> m2 = (Map<Object, Object>) o2;

    ComparisonResult result = checkNullEquality(data, o1, o2, sb);
    if (result != ComparisonResult.UNKNOWN) {
      return result == ComparisonResult.EQUAL;
    }

    if (!checkSizeEquality(data, m1.keySet(), m2.keySet(), sb, "map.keySet")) {
      return false;
    }

    for (Object k1 : m1.keySet()) {
      if (!m2.containsKey(k1)) {
        appendResult(data, sb, "Key %s in m1 not found in m2", k1);
        return false;
      }

      Object v1 = m1.get(k1);
      Object v2 = m2.get(k1);
      if (!this.areEqual(data.valueData, v1, v2, sb)) {
        return false;
      }
    }

    return true;
  }

  private boolean areBinaryFieldsEqual(Object o1, Object o2) {
    if (o1 instanceof byte[]) {
      if (Arrays.equals((byte[]) o1, (byte[]) o2)) {
        return true;
      }
    } else if (o1 instanceof ByteBuffer) {
      if (((ByteBuffer) o1).compareTo((ByteBuffer) o2) == 0) {
        return true;
      }
    } else {
      throw new UnsupportedOperationException(
          String.format("Unsupported binary field type: %s", o1.getClass().getName()));
    }

    return false;
  }

  private void appendResult(
      ThriftMetadata.ThriftObject data,
      StringBuilder sb,
      String format,
      Object... args) {
    if (sb != null) {
      String msg = String.format(format, args);
      sb.append(data.fieldId.getFieldName());
      sb.append(" : ");
      sb.append(msg);
    }
  }

  private void appendNotEqual(
      ThriftMetadata.ThriftObject data,
      StringBuilder sb,
      Object o1,
      Object o2,
      String o1name,
      String o2name) {

    String o1s = o1.toString();
    String o2s = o2.toString();

    if ((o1s.length() + o2s.length()) < 100) {
      appendResult(data, sb, "%s (%s) != %s (%s)", o1name, o1s, o2name, o2s);
    } else {
      appendResult(
          data, sb, "%s != %s\n%s =\n%s\n%s =\n%s\n",
          o1name, o2name, o1name, o1s, o2name, o2s);
    }
  }

  private ComparisonResult checkNullEquality(
      ThriftMetadata.ThriftObject data,
      Object o1,
      Object o2,
      StringBuilder sb) {
    if ((o1 == null) && (o2 == null)) {
      return ComparisonResult.EQUAL;
    }

    if (o1 == null) {
      appendResult(data, sb, "o1 (null) != o2");
    }

    if (o2 == null) {
      appendResult(data, sb, "o1 != o2 (null)");
    }

    return ComparisonResult.UNKNOWN;
  }

  private boolean checkSizeEquality(
      ThriftMetadata.ThriftObject data,
      Collection c1,
      Collection c2,
      StringBuilder sb,
      String typeName) {

    if (c1.size() != c2.size()) {
      appendResult(
          data, sb, "%s1.size(%d) != %s2.size(%d)",
          typeName, c1.size(), typeName, c2.size());
      return false;
    }

    return true;
  }

  static UnsupportedOperationException unsupportedFieldTypeException(byte fieldType) {
    return new UnsupportedOperationException("field type not supported: '" + fieldType + "'");
  }
}
